25.6 智能维护

25 分钟阅读

智能维护概述#

Claude Code 可以自动化维护任务、监控系统健康状态、预测潜在问题,并提供维护优化建议。通过智能维护功能,开发者可以降低维护成本、提高系统可靠性。

自动化维护#

1. 依赖管理#

示例:自动化依赖管理

用户请求: "为项目实现自动化依赖管理"

Claude Code 生成的代码:

requirements.txt#

bash
Flask-SQLAlchemy==3.0.5
Flask-JWT-Extended==4.5.2
Flask-Mail==0.9.1
Flask-Migrate==4.0.5
gunicorn==21.2.0
psycopg2-binary==2.9.7
redis==4.6.0
celery==5.3.4
python-dotenv==1.0.0
```
## requirements-dev.txt

```
pytest==7.4.2
pytest-cov==4.1.0
pytest-mock==3.11.1
black==23.9.1
flake8==6.1.0
mypy==1.5.1
isort==5.12.0
pre-commit==3.4.0
## 依赖更新脚本
````python
`python

#!/usr/bin/env python3
"""
依赖更新脚本
自动检查并更新过期的依赖包
"""

import subprocess
import json
from typing import List, Dict
import sys

def get_outdated_packages() -> List[Dict]:
"""获取过期的包"""
result = subprocess.run(
['pip', 'list', '--outdated', '--format=json'],
capture_output=True,
text=True
)

if result.returncode != 0:
print("Error: Failed to get outdated packages")
return []

packages = json.loads(result.stdout)
return packages

def check_compatibility(package: str, version: str) -> bool:
"""检查包的兼容性"""
try:
result = subprocess.run(
['pip', 'install', '--dry-run', f'{package}=={version}'],
capture_output=True,
text=True
)

return result.returncode == 0
except Exception as e:
print(f"Error checking compatibility for {package}: {e}")
return False

def update_package(package: str, version: str) -> bool:
"""更新包"""
try:
result = subprocess.run(
['pip', 'install', '--upgrade', f'{package}=={version}'],
capture_output=True,
text=True
)

return result.returncode == 0
except Exception as e:
print(f"Error updating {package}: {e}")
return False

def update_requirements():
"""更新 requirements.txt"""
try:
result = subprocess.run(
['pip', 'freeze'],
capture_output=True,
text=True
)

with open('requirements.txt', 'w') as f:
f.write(result.stdout)

return True
except Exception as e:
print(f"Error updating requirements.txt: {e}")
return False

def main():
"""主函数"""
print("检查过期依赖...")

outdated_packages = get_outdated_packages()

if not outdated_packages:
print("所有依赖都是最新的!")
return

print(f"\n发现 {len(outdated_packages)} 个过期包:")
for pkg in outdated_packages:
print(f"  - {pkg['name']}: {pkg['version']} -> {pkg['latest_version']}")

print("\n检查兼容性...")
compatible_packages = []

for pkg in outdated_packages:
package_name = pkg['name']
latest_version = pkg['latest_version']

if check_compatibility(package_name, latest_version):
compatible_packages.append(pkg)
print(f"  ✓ {package_name} {latest_version}")
else:
print(f"  ✗ {package_name} {latest_version} (不兼容)")

if not compatible_packages:
print("\n没有可更新的包")
return

print(f"\n准备更新 {len(compatible_packages)} 个包...")
response = input("是否继续?(y/n): ")

if response.lower() != 'y':
print("取消更新")
return

print("\n开始更新...")
for pkg in compatible_packages:
package_name = pkg['name']
latest_version = pkg['latest_version']

print(f"更新 {package_name} 到 {latest_version}...")
if update_package(package_name, latest_version):
print(f"  ✓ {package_name} 更新成功")
else:
print(f"  ✗ {package_name} 更新失败")

print("\n更新 requirements.txt...")
if update_requirements():
print("  ✓ requirements.txt 更新成功")
else:
print("  ✗ requirements.txt 更新失败")

print("\n依赖更新完成!")

if __name__ == '__main__':
main()

```## 依赖安全检查

```
python

#!/usr/bin/env python3
"""
依赖安全检查脚本
检查依赖包的安全漏洞
"""

import subprocess
import json
from typing import List, Dict

def check_security() -> List[Dict]:
    """检查依赖安全性"""
    try:
        result = subprocess.run(
            ['pip', 'audit'],
            capture_output=True,
            text=True
        )

        if result.returncode != 0:
            print("Error: Failed to check security")
            return []

        return json.loads(result.stdout)
    except Exception as e:
        print(f"Error checking security: {e}")
        return []

def main():
    """主函数"""
    print("检查依赖安全性...")

    vulnerabilities = check_security()

    if not vulnerabilities:
        print("未发现安全漏洞!")
        return

    print(f"\n发现 {len(vulnerabilities)} 个安全漏洞:")
    for vuln in vulnerabilities:
        print(f"\n  包: {vuln['name']}")
        print(f"  版本: {vuln['version']}")
        print(f"  漏洞: {vuln['advisory']}")
        print(f"  严重性: {vuln['severity']}")
        print(f"  修复版本: {vuln['fix_versions']}")

if __name__ == '__main__':
    main()

### 2. 数据库维护

```
python

# 示例:自动化数据库维护

用户请求:
"为应用实现自动化数据库维护"

Claude Code 生成的代码:

```python
```python

from datetime import datetime, timedelta
from sqlalchemy import text
from app import db
import logging

logger = logging.getLogger(__name__)

class DatabaseMaintenance:
    """数据库维护类"""

    def __init__(self, db_session):
        self.db = db_session

    def analyze_tables(self):
        """分析表"""
        tables = ['users', 'products', 'orders', 'payments']

        for table in tables:
            try:
                result = self.db.execute(text(f"ANALYZE {table}"))
                logger.info(f"Table {table} analyzed successfully")
            except Exception as e:
                logger.error(f"Error analyzing table {table}: {e}")

    def vacuum_tables(self):
        """清理表"""
        tables = ['users', 'products', 'orders', 'payments']

        for table in tables:
            try:
                result = self.db.execute(text(f"VACUUM ANALYZE {table}"))
                logger.info(f"Table {table} vacuumed successfully")
            except Exception as e:
                logger.error(f"Error vacuuming table {table}: {e}")

    def rebuild_indexes(self):
        """重建索引"""
        indexes = [
            'idx_users_username',
            'idx_users_email',
            'idx_orders_user_id',
            'idx_orders_status',
            'idx_products_name'
        ]

        for index in indexes:
            try:
                result = self.db.execute(text(f"REINDEX INDEX {index}"))
                logger.info(f"Index {index} rebuilt successfully")
            except Exception as e:
                logger.error(f"Error rebuilding index {index}: {e}")

    def clean_old_logs(self, days=30):
        """清理旧日志"""
        cutoff_date = datetime.utcnow() - timedelta(days=days)

        try:
            result = self.db.execute(
                text("""
                    DELETE FROM logs
                    WHERE created_at < :cutoff_date
                """),
                {'cutoff_date': cutoff_date}
            )

            deleted_count = result.rowcount
            logger.info(f"Deleted {deleted_count} old log entries")

            self.db.commit()

            return deleted_count
        except Exception as e:
            logger.error(f"Error cleaning old logs: {e}")
            self.db.rollback()
            return 0

    def clean_old_sessions(self, days=7):
        """清理旧会话"""
        cutoff_date = datetime.utcnow() - timedelta(days=days)

        try:
            result = self.db.execute(
                text("""
                    DELETE FROM sessions
                    WHERE expires_at < :cutoff_date
                """),
                {'cutoff_date': cutoff_date}
            )

            deleted_count = result.rowcount
            logger.info(f"Deleted {deleted_count} old sessions")

            self.db.commit()

            return deleted_count
        except Exception as e:
            logger.error(f"Error cleaning old sessions: {e}")
            self.db.rollback()
            return 0

    def optimize_database(self):
        """优化数据库"""
        try:
            self.analyze_tables()
            self.vacuum_tables()
            self.rebuild_indexes()
            logger.info("Database optimization completed successfully")
        except Exception as e:
            logger.error(f"Error optimizing database: {e}")

    def get_database_stats(self):
        """获取数据库统计信息"""
        stats = {}

        try:
            # 表大小
            result = self.db.execute(text("""
                SELECT
                    schemaname,
                    tablename,
                    pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size
                FROM pg_tables
                WHERE schemaname = 'public'
                ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC
            """))

            stats['table_sizes'] = [
                {
                    'schema': row[0],
                    'table': row[1],
                    'size': row[2]
                }
                for row in result
            ]

            # 索引使用情况
            result = self.db.execute(text("""
                SELECT
                    schemaname,
                    tablename,
                    indexname,
                    idx_scan,
                    idx_tup_read,
                    idx_tup_fetch
                FROM pg_stat_user_indexes
                ORDER BY idx_scan DESC
            """))

            stats['index_usage'] = [
                {
                    'schema': row[0],
                    'table': row[1],
                    'index': row[2],
                    'scans': row[3],
                    'tuples_read': row[4],
                    'tuples_fetched': row[5]
                }
                for row in result
            ]

            # 慢查询
            result = self.db.execute(text("""
                SELECT
                    query,
                    calls,
                    total_time,
                    mean_time,
                    max_time
                FROM pg_stat_statements
                ORDER BY mean_time DESC
                LIMIT 10
            """))

            stats['slow_queries'] = [
                {
                    'query': row[0],
                    'calls': row[1],
                    'total_time': row[2],
                    'mean_time': row[3],
                    'max_time': row[4]
                }
                for row in result
            ]

            return stats
        except Exception as e:
            logger.error(f"Error getting database stats: {e}")
            return stats

def run_maintenance():
    """运行维护任务"""
    maintenance = DatabaseMaintenance(db.session)

    print("开始数据库维护...")

    # 优化数据库
    print("优化数据库...")
    maintenance.optimize_database()

    # 清理旧数据
    print("清理旧日志...")
    maintenance.clean_old_logs(days=30)

    print("清理旧会话...")
    maintenance.clean_old_sessions(days=7)

    # 获取统计信息
    print("获取数据库统计信息...")
    stats = maintenance.get_database_stats()

    print("\n数据库统计信息:")
    print(f"  表数量: {len(stats['table_sizes'])}")
    print(f"  索引数量: {len(stats['index_usage'])}")
    print(f"  慢查询数量: {len(stats['slow_queries'])}")

    print("\n数据库维护完成!")

if __name__ == '__main__':
    run_maintenance()

## 系统监控
### 1. 性能监控

```
python

# 示例:实现性能监控

用户请求:
"为应用实现性能监控"

Claude Code 生成的代码:

```python
```python

import time
import psutil
from functools import wraps
from flask import request, g
from prometheus_client import Counter, Histogram, Gauge, generate_latest
import logging

logger = logging.getLogger(__name__)

# Prometheus 指标
request_count = Counter(
    'http_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status']
)

request_duration = Histogram(
    'http_request_duration_seconds',
    'HTTP request duration',
    ['method', 'endpoint']
)

active_connections = Gauge(
    'active_connections',
    'Number of active connections'
)

memory_usage = Gauge(
    'memory_usage_bytes',
    'Memory usage in bytes'
)

cpu_usage = Gauge(
    'cpu_usage_percent',
    'CPU usage percentage'
)

disk_usage = Gauge(
    'disk_usage_percent',
    'Disk usage percentage'
)

def track_request_metrics(f):
    """跟踪请求指标"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        start_time = time.time()

        # 记录请求开始
        g.start_time = start_time

        try:
            response = f(*args, **kwargs)
            status_code = response.status_code if hasattr(response, 'status_code') else 200

            # 记录请求计数
            request_count.labels(
                method=request.method,
                endpoint=request.endpoint or 'unknown',
                status=status_code
            ).inc()

            # 记录请求持续时间
            duration = time.time() - start_time
            request_duration.labels(
                method=request.method,
                endpoint=request.endpoint or 'unknown'
            ).observe(duration)

            return response
        except Exception as e:
            # 记录错误
            request_count.labels(
                method=request.method,
                endpoint=request.endpoint or 'unknown',
                status=500
            ).inc()
            raise

    return decorated_function

def update_system_metrics():
    """更新系统指标"""
    # 内存使用
    memory = psutil.virtual_memory()
    memory_usage.set(memory.used)

    # CPU 使用
    cpu_usage.set(psutil.cpu_percent())

    # 磁盘使用
    disk = psutil.disk_usage('/')
    disk_usage.set(disk.percent)

    # 活跃连接
    active_connections.set(len(psutil.net_connections()))

class PerformanceMonitor:
    """性能监控类"""

    def __init__(self, app):
        self.app = app
        self.metrics = {}

    def track_function(self, name):
        """跟踪函数性能"""
        def decorator(f):
            @wraps(f)
            def decorated_function(*args, **kwargs):
bash
            start_time = time.time()

            try:
                result = f(*args, **kwargs)
                duration = time.time() - start_time

                self.record_metric(name, duration, success=True)

                return result
            except Exception as e:
                duration = time.time() - start_time

                self.record_metric(name, duration, success=False)

                logger.error(f"Error in {name}: {e}")
                raise

        return decorated_function
    return decorator

def record_metric(self, name, duration, success=True):
    """记录指标"""
    if name not in self.metrics:
        self.metrics[name] = {
            'count': 0,
            'total_duration': 0,
            'success_count': 0,
            'error_count': 0,
            'min_duration': float('inf'),
            'max_duration': 0
        }

    metric = self.metrics[name]
    metric['count'] += 1
    metric['total_duration'] += duration

    if success:
        metric['success_count'] += 1
    else:
        metric['error_count'] += 1

    metric['min_duration'] = min(metric['min_duration'], duration)
    metric['max_duration'] = max(metric['max_duration'], duration)

def get_metrics(self):
    """获取指标"""
    for name, metric in self.metrics.items():
        if metric['count'] > 0:
            metric['avg_duration'] = metric['total_duration'] / metric['count']
            metric['success_rate'] = metric['success_count'] / metric['count']

    return self.metrics

def get_slow_functions(self, threshold=1.0):
    """获取慢函数"""
    slow_functions = []

    for name, metric in self.metrics.items():
        if metric['count'] > 0:
            avg_duration = metric['total_duration'] / metric['count']
            if avg_duration > threshold:
                slow_functions.append({
                    'name': name,
                    'avg_duration': avg_duration,
                    'count': metric['count'],
                    'max_duration': metric['max_duration']
                })

    return sorted(slow_functions, key=lambda x: x['avg_duration'], reverse=True)

def setup_monitoring(app): """设置监控"""

bash
@app.route('/metrics')
def metrics():
    """Prometheus 指标端点"""
    return generate_latest()

@app.before_request
def before_request():
    """请求前处理"""
    g.start_time = time.time()
    active_connections.inc()

@app.after_request
def after_request(response):
    """请求后处理"""
    if hasattr(g, 'start_time'):
        duration = time.time() - g.start_time

        request_count.labels(
            method=request.method,
            endpoint=request.endpoint or 'unknown',
            status=response.status_code
        ).inc()

        request_duration.labels(
            method=request.method,
            endpoint=request.endpoint or 'unknown'
        ).observe(duration)

    active_connections.dec()

    return response

# 定期更新系统指标
def update_metrics():
    while True:
        update_system_metrics()
        time.sleep(5)

import threading
thread = threading.Thread(target=update_metrics, daemon=True)
thread.start()

return app

2. 错误监控#

python
````python # 示例:实现错误监控 用户请求: "为应用实现错误监控" Claude Code 生成的代码: ```python ```python import logging import traceback from datetime import datetime from typing import Dict, List from flask import request, g import sentry_sdk from sentry_sdk.integrations.flask import FlaskIntegration logger = logging.getLogger(__name__) class ErrorMonitor: """错误监控类""" def __init__(self, app): self.app = app self.errors = [] self.error_stats = {} def capture_exception(self, exception, context=None): """捕获异常""" error_data = { 'type': type(exception).__name__, 'message': str(exception), 'traceback': traceback.format_exc(), 'timestamp': datetime.utcnow().isoformat(), 'context': context or {} } # 添加请求信息 if request: error_data['request'] = { 'method': request.method, 'path': request.path, 'url': request.url, 'ip': request.remote_addr, 'user_agent': request.user_agent.string } # 添加用户信息 if hasattr(g, 'user_id'): error_data['user_id'] = g.user_id self.errors.append(error_data) # 更新统计 error_type = error_data['type'] if error_type not in self.error_stats: self.error_stats[error_type] = { 'count': 0, 'last_occurrence': None } self.error_stats[error_type]['count'] += 1 self.error_stats[error_type]['last_occurrence'] = error_data['timestamp'] # 记录日志 logger.error( f"Exception captured: {error_type}", extra=error_data ) # 发送到 Sentry sentry_sdk.capture_exception(exception) def get_errors(self, limit=100): """获取错误列表""" return self.errors[-limit:] def get_error_stats(self): """获取错误统计""" return self.error_stats def get_frequent_errors(self, threshold=10): """获取频繁错误""" frequent_errors = [] for error_type, stats in self.error_stats.items(): if stats['count'] >= threshold: frequent_errors.append({ 'type': error_type, 'count': stats['count'], 'last_occurrence': stats['last_occurrence'] }) return sorted(frequent_errors, key=lambda x: x['count'], reverse=True) def clear_errors(self): """清除错误""" self.errors = [] self.error_stats = {} def setup_error_monitoring(app, sentry_dsn): """设置错误监控""" # 初始化 Sentry sentry_sdk.init( dsn=sentry_dsn, integrations=[FlaskIntegration()], traces_sample_rate=1.0, profiles_sample_rate=1.0 ) error_monitor = ErrorMonitor(app) @app.errorhandler(Exception) def handle_exception(e): """处理异常""" error_monitor.capture_exception(e) if request.is_json: return {'error': str(e)}, 500 else: return str(e), 500 @app.errorhandler(404) def handle_not_found(e): """处理 404""" logger.warning(f"404 Not Found: {request.path}") return {'error': 'Not found'}, 404 @app.errorhandler(500) def handle_server_error(e): """处理 500""" error_monitor.capture_exception(e) return {'error': 'Internal server error'}, 500 @app.route('/admin/errors') def get_errors(): """获取错误列表""" errors = error_monitor.get_errors() return {'errors': errors} @app.route('/admin/errors/stats') def get_error_stats(): """获取错误统计""" stats = error_monitor.get_error_stats() return {'stats': stats} @app.route('/admin/errors/frequent') def get_frequent_errors(): """获取频繁错误""" frequent_errors = error_monitor.get_frequent_errors() return {'frequent_errors': frequent_errors} return app ```## 预测性维护 ### 1. 容量预测 # 示例:实现容量预测 用户请求: "为应用实现容量预测" Claude Code 生成的代码: ````python `python import numpy as np from datetime import datetime, timedelta from typing import List, Dict import logging logger = logging.getLogger(__name__) class CapacityPredictor: """容量预测器""" def __init__(self): self.history = [] self.predictions = {} def add_metric(self, timestamp: datetime, metric_name: str, value: float): """添加指标""" self.history.append({ 'timestamp': timestamp, 'metric': metric_name, 'value': value }) def predict_capacity(self, metric_name: str, days: int = 7) -> List[Dict]: """预测容量""" # 获取历史数据 data = [ entry for entry in self.history if entry['metric'] == metric_name ] if len(data) < 30: logger.warning(f"Insufficient data for {metric_name}") return [] # 提取值 values = [entry['value'] for entry in data] # 计算趋势 trend = self._calculate_trend(values) # 预测未来值 predictions = [] for i in range(days): predicted_value = values[-1] + trend * (i + 1) predicted_date = datetime.utcnow() + timedelta(days=i + 1) predictions.append({ 'date': predicted_date.isoformat(), 'value': predicted_value, 'metric': metric_name }) self.predictions[metric_name] = predictions return predictions def _calculate_trend(self, values: List[float]) -> float: """计算趋势""" if len(values) < 2: return 0 # 使用线性回归 x = np.arange(len(values)) y = np.array(values) # 计算斜率 slope = np.polyfit(x, y, 1)[0] return slope def check_capacity_alerts(self, threshold: float = 0.9) -> List[Dict]: """检查容量告警""" alerts = [] for metric_name, predictions in self.predictions.items(): for prediction in predictions: if prediction['value'] >= threshold: alerts.append({ 'metric': metric_name, 'date': prediction['date'], 'value': prediction['value'], 'threshold': threshold }) return sorted(alerts, key=lambda x: x['value'], reverse=True) def get_capacity_recommendations(self) -> List[Dict]: """获取容量建议""" recommendations = [] alerts = self.check_capacity_alerts() if alerts: recommendations.append({ 'type': 'scale_up', 'message': f"发现 {len(alerts)} 个容量告警,建议扩容", 'alerts': alerts }) # 检查资源利用率 for metric_name, predictions in self.predictions.items(): avg_value = np.mean([p['value'] for p in predictions]) if avg_value < 0.3: recommendations.append({ 'type': 'scale_down', 'message': f"{metric_name} 平均利用率较低,建议缩容", 'metric': metric_name, 'avg_value': avg_value }) return recommendations def run_capacity_prediction(): """运行容量预测""" predictor = CapacityPredictor() # 添加历史数据(示例) now = datetime.utcnow() for i in range(30): timestamp = now - timedelta(days=30 - i) value = 0.5 + (i / 100) + np.random.normal(0, 0.05) predictor.add_metric(timestamp, 'cpu_usage', value) # 预测容量 print("预测 CPU 使用率...") predictions = predictor.predict_capacity('cpu_usage', days=7) print("\n预测结果:") for prediction in predictions: print(f" {prediction['date']}: {prediction['value']:.2%}") # 检查告警 print("\n检查容量告警...") alerts = predictor.check_capacity_alerts(threshold=0.9) if alerts: print(f"发现 {len(alerts)} 个告警:") for alert in alerts: print(f" {alert['date']}: {alert['value']:.2%} (阈值: {alert['threshold']:.0%})") else: print("未发现容量告警") # 获取建议 print("\n获取容量建议...") recommendations = predictor.get_capacity_recommendations() for rec in recommendations: print(f" {rec['type']}: {rec['message']}") if __name__ == '__main__': run_capacity_prediction() ```

标记本节教程为已读

记录您的学习进度,方便后续查看。